/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

package software.amazon.awssdk.enhanced.dynamodb.extensions;

import java.time.Clock;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.NotThreadSafe;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.annotations.ThreadSafe;
import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType;
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClientExtension;
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbExtensionContext;
import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticAttributeTag;
import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableMetadata;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.utils.Validate;

/**
 * This extension enables selected attributes to be automatically updated with a current timestamp every time they are written
 * to the database.
 * <p>
 *     This extension is not loaded by default when you instantiate a
 *     {@link software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient}. Thus you need to specify it in custom extension
 *     while creating the enhanced client.
 *     <p>
 *         Example to add AutoGeneratedTimestampRecordExtension along with default extensions is
 *         <code>DynamoDbEnhancedClient.builder().extensions(Stream.concat(ExtensionResolver.defaultExtensions().stream(),
 *         Stream.of(AutoGeneratedTimestampRecordExtension.create())).collect(Collectors.toList())).build();</code>
 *     </p>
 *     <p>
 *         Example to just add AutoGeneratedTimestampRecordExtension without default extensions is
 *         <code>DynamoDbEnhancedClient.builder().extensions(AutoGeneratedTimestampRecordExtension.create())).build();</code>
 *     </p>
 * </p>
 * <p>
 *     To utilize auto generated timestamp update, first create a field in your model that will be used to store the record
 *     timestamp of modification. This class field must be an {@link Instant} Class type, and you need to tag it as the
 *     autoGeneratedTimeStampAttribute. If you are using the
 *     {@link software.amazon.awssdk.enhanced.dynamodb.mapper.BeanTableSchema}
 *     then you should use the
 *     {@link software.amazon.awssdk.enhanced.dynamodb.extensions.annotations.DynamoDbAutoGeneratedTimestampAttribute}
 *     annotation, otherwise if you are using the {@link software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema}
 *     then you should use the {@link AttributeTags#autoGeneratedTimestampAttribute()} static attribute tag.
 * <p>
 * Every time a new update of the record is successfully written to the database, the timestamp at which it was modified will
 * be automatically updated. This extension applies the conversions as defined in the attribute convertor.
 */
@SdkPublicApi
@ThreadSafe
public final class AutoGeneratedTimestampRecordExtension implements DynamoDbEnhancedClientExtension {
    private static final String CUSTOM_METADATA_KEY = "AutoGeneratedTimestampExtension:AutoGeneratedTimestampAttribute";
    private static final AutoGeneratedTimestampAttribute
        AUTO_GENERATED_TIMESTAMP_ATTRIBUTE = new AutoGeneratedTimestampAttribute();
    private final Clock clock;

    private AutoGeneratedTimestampRecordExtension() {
        this.clock = Clock.systemUTC();
    }

    /**
     * Attribute tag to identify the meta data for {@link AutoGeneratedTimestampRecordExtension}.
     */
    public static final class AttributeTags {

        private AttributeTags() {
        }

        /**
         * Tags which indicate that the given attribute is supported wih Auto Generated Timestamp Record Extension.
         * @return Tag name for AutoGenerated Timestamp Records
         */
        public static StaticAttributeTag autoGeneratedTimestampAttribute() {
            return AUTO_GENERATED_TIMESTAMP_ATTRIBUTE;
        }
    }

    private AutoGeneratedTimestampRecordExtension(Builder builder) {
        this.clock = builder.baseClock == null ? Clock.systemUTC() : builder.baseClock;
    }

    /**
     * Create a builder that can be used to create a {@link AutoGeneratedTimestampRecordExtension}.
     * @return Builder to create AutoGeneratedTimestampRecordExtension,
     */
    public static Builder builder() {
        return new Builder();
    }

    /**
     * Returns a builder initialized with all existing values on the Extension object.
     */
    public Builder toBuilder() {
        return builder().baseClock(this.clock);
    }

    /**
     * @return an Instance of {@link AutoGeneratedTimestampRecordExtension}
     */
    public static AutoGeneratedTimestampRecordExtension create() {
        return new AutoGeneratedTimestampRecordExtension();
    }

    /**
     * @param context The {@link DynamoDbExtensionContext.BeforeWrite} context containing the state of the execution.
     * @return WriteModification Instance updated with attribute updated with Extension.
     */
    @Override
    public WriteModification beforeWrite(DynamoDbExtensionContext.BeforeWrite context) {

        Collection<String> customMetadataObject = context.tableMetadata()
                                       .customMetadataObject(CUSTOM_METADATA_KEY, Collection.class).orElse(null);

        if (customMetadataObject == null) {
            return WriteModification.builder().build();
        }
        Map<String, AttributeValue> itemToTransform = new HashMap<>(context.items());
        customMetadataObject.forEach(
            key -> insertTimestampInItemToTransform(itemToTransform, key,
                                                    context.tableSchema().converterForAttribute(key)));
        return WriteModification.builder()
                                .transformedItem(Collections.unmodifiableMap(itemToTransform))
                                .build();
    }

    private void insertTimestampInItemToTransform(Map<String, AttributeValue> itemToTransform,
                                                  String key,
                                                  AttributeConverter converter) {
        itemToTransform.put(key, converter.transformFrom(clock.instant()));
    }

    /**
     *  Builder for a {@link AutoGeneratedTimestampRecordExtension}
     */
    @NotThreadSafe
    public static final class Builder {

        private Clock baseClock;

        private Builder() {
        }

        /**
         * Sets the clock instance , else Clock.systemUTC() is used by default.
         * Every time a new timestamp is generated this clock will be used to get the current point in time. If a custom clock
         * is not specified, the default system clock will be used.
         *
         * @param clock Clock instance to set the current timestamp.
         * @return This builder for method chaining.
         */
        public Builder baseClock(Clock clock) {
            this.baseClock = clock;
            return this;
        }

        /**
         * Builds an {@link AutoGeneratedTimestampRecordExtension} based on the values stored in this builder
         */
        public AutoGeneratedTimestampRecordExtension build() {
            return new AutoGeneratedTimestampRecordExtension(this);
        }
    }

    private static class AutoGeneratedTimestampAttribute implements StaticAttributeTag {


        @Override
        public <R> void validateType(String attributeName, EnhancedType<R> type,
                                     AttributeValueType attributeValueType) {

            Validate.notNull(type, "type is null");
            Validate.notNull(type.rawClass(), "rawClass is null");
            Validate.notNull(attributeValueType, "attributeValueType is null");

            if (!type.rawClass().equals(Instant.class)) {
                throw new IllegalArgumentException(String.format(
                    "Attribute '%s' of Class type %s is not a suitable Java Class type to be used as a Auto Generated "
                    + "Timestamp attribute. Only java.time.Instant Class type is supported.", attributeName, type.rawClass()));
            }
        }

        @Override
        public Consumer<StaticTableMetadata.Builder> modifyMetadata(String attributeName,
                                                                    AttributeValueType attributeValueType) {
            return metadata -> metadata.addCustomMetadataObject(CUSTOM_METADATA_KEY, Collections.singleton(attributeName))
                                       .markAttributeAsKey(attributeName, attributeValueType);
        }
    }
}
